Data Engineering Code Voorbeelden

Echte, werkende code snippets voor SQL, Python en PySpark. Kopieer en gebruik direct in je projecten.

SQL Code Voorbeelden

Praktische SQL scripts voor data engineering, ETL en data analysis

Incremental Data Load

SQL Snowflake ETL

Use case: Incrementeel laden van data in Snowflake met MERGE statement. Optimaliseert performance en reduceert kosten.

  • Handelt updates en inserts
  • Minimaliseert data processing
  • Audit logging ingebouwd
incremental_load.sql
-- Incremental Data Load met MERGE statement
-- Geschikt voor Snowflake, BigQuery, Synapse

-- Stap 1: Maak staging table aan (tijdelijk)
CREATE OR REPLACE TEMPORARY TABLE staging_sales AS
SELECT 
    order_id,
    customer_id,
    product_id,
    quantity,
    amount,
    order_date,
    'batch_' || CURRENT_DATE() AS batch_id,
    CURRENT_TIMESTAMP() AS loaded_at
FROM external_sales_source
WHERE order_date >= DATEADD(day, -1, CURRENT_DATE());

-- Stap 2: MERGE statement voor incremental load
MERGE INTO production.sales_fact AS target
USING staging_sales AS source
    ON target.order_id = source.order_id
WHEN MATCHED THEN
    UPDATE SET
        target.quantity = source.quantity,
        target.amount = source.amount,
        target.updated_at = CURRENT_TIMESTAMP(),
        target.is_active = TRUE
WHEN NOT MATCHED THEN
    INSERT (
        order_id,
        customer_id,
        product_id,
        quantity,
        amount,
        order_date,
        created_at,
        updated_at,
        is_active
    ) VALUES (
        source.order_id,
        source.customer_id,
        source.product_id,
        source.quantity,
        source.amount,
        source.order_date,
        CURRENT_TIMESTAMP(),
        CURRENT_TIMESTAMP(),
        TRUE
    );

-- Stap 3: Soft delete oude records (optioneel)
UPDATE production.sales_fact
SET is_active = FALSE,
    updated_at = CURRENT_TIMESTAMP()
WHERE order_date < DATEADD(month, -13, CURRENT_DATE())
  AND is_active = TRUE;

-- Stap 4: Audit logging
INSERT INTO audit.data_load_log (
    table_name,
    rows_inserted,
    rows_updated,
    rows_deleted,
    load_timestamp,
    batch_id
)
SELECT 
    'sales_fact' AS table_name,
    COUNT(CASE WHEN metadata$action = 'INSERT' THEN 1 END) AS rows_inserted,
    COUNT(CASE WHEN metadata$action = 'UPDATE' THEN 1 END) AS rows_updated,
    0 AS rows_deleted,
    CURRENT_TIMESTAMP() AS load_timestamp,
    'batch_' || CURRENT_DATE() AS batch_id
FROM staging_sales;

Data Quality Checks

SQL Data Quality Validation

Use case: Uitgebreide data quality checks voor een customer database. Controleert completeness, validity en consistency.

  • NULL value checks
  • Domain validation
  • Referential integrity
  • Data profiling
data_quality_checks.sql
-- Data Quality Checks voor Customer Data
-- Geschikt voor PostgreSQL, MySQL, SQL Server

-- 1. Completeness Checks (NULL/Empty values)
WITH completeness_checks AS (
    SELECT 
        'customers' AS table_name,
        COUNT(*) AS total_rows,
        COUNT(customer_id) AS customer_id_not_null,
        COUNT(first_name) AS first_name_not_null,
        COUNT(email) AS email_not_null,
        COUNT(CASE WHEN email LIKE '%@%.%' THEN 1 END) AS valid_email_format,
        COUNT(CASE WHEN phone ~ '^[0-9+\-\s()]{10,20}$' THEN 1 END) AS valid_phone
    FROM production.customers
    WHERE is_active = TRUE
),
completeness_pct AS (
    SELECT 
        table_name,
        total_rows,
        ROUND(100.0 * customer_id_not_null / total_rows, 2) AS customer_id_complete_pct,
        ROUND(100.0 * first_name_not_null / total_rows, 2) AS first_name_complete_pct,
        ROUND(100.0 * email_not_null / total_rows, 2) AS email_complete_pct,
        ROUND(100.0 * valid_email_format / email_not_null, 2) AS email_valid_pct,
        ROUND(100.0 * valid_phone / total_rows, 2) AS phone_valid_pct
    FROM completeness_checks
)

SELECT * FROM completeness_pct

-- 2. Uniqueness Checks
UNION ALL
SELECT 
    'customers_uniqueness' AS table_name,
    COUNT(*) AS total_rows,
    NULL AS customer_id_complete_pct,
    NULL AS first_name_complete_pct,
    NULL AS email_complete_pct,
    NULL AS email_valid_pct,
    ROUND(100.0 * unique_emails / total_rows, 2) AS email_unique_pct
FROM (
    SELECT 
        COUNT(*) AS total_rows,
        COUNT(DISTINCT email) AS unique_emails
    FROM production.customers
    WHERE email IS NOT NULL
) t

-- 3. Referential Integrity Check
UNION ALL
SELECT 
    'referential_integrity' AS table_name,
    COUNT(*) AS orphaned_records,
    NULL, NULL, NULL, NULL,
    ROUND(100.0 * COUNT(*) / (SELECT COUNT(*) FROM production.orders), 2) AS orphaned_pct
FROM production.orders o
LEFT JOIN production.customers c ON o.customer_id = c.customer_id
WHERE c.customer_id IS NULL

-- 4. Business Rule Validation
UNION ALL
SELECT 
    'business_rules' AS table_name,
    COUNT(*) AS rule_violations,
    NULL, NULL, NULL, NULL,
    ROUND(100.0 * COUNT(*) / (SELECT COUNT(*) FROM production.orders), 2) AS violation_pct
FROM production.orders o
JOIN production.customers c ON o.customer_id = c.customer_id
WHERE o.order_amount < 0 
   OR o.order_date > CURRENT_DATE
   OR (c.country = 'NL' AND o.order_amount > 10000)

Python Code Voorbeelden

Python scripts voor data processing, ETL pipelines en data quality

ETL Pipeline Template

Python Databricks ETL

Use case: Complete ETL pipeline template met error handling, logging en configuratie management.

  • Configuration from YAML
  • Error handling en retry logic
  • Logging naar zowel console als file
  • Modular design
etl_pipeline.py
#!/usr/bin/env python3
"""
ETL Pipeline Template voor Data Engineering
"""

import yaml
import logging
from datetime import datetime
from typing import Dict, Any
import pandas as pd
from pathlib import Path

# Setup logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('etl_pipeline.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

class ETLError(Exception):
    """Custom exception voor ETL fouten"""
    pass

class ETLPipeline:
    """Basis ETL pipeline klasse"""
    
    def __init__(self, config_path: str = "config/etl_config.yaml"):
        self.config = self.load_config(config_path)
        self.stats = {
            'start_time': None,
            'end_time': None,
            'rows_processed': 0,
            'rows_failed': 0,
            'errors': []
        }
        
    def load_config(self, config_path: str) -> Dict[str, Any]:
        """Laad configuratie van YAML bestand"""
        try:
            with open(config_path, 'r') as f:
                config = yaml.safe_load(f)
            logger.info(f"Config geladen van {config_path}")
            return config
        except Exception as e:
            logger.error(f"Fout bij laden config: {e}")
            raise ETLError(f"Config laden mislukt: {e}")
    
    def extract(self) -> pd.DataFrame:
        """Extract data van bron"""
        try:
            source_type = self.config['source']['type']
            source_path = self.config['source']['path']
            
            logger.info(f"Extract data van {source_type}: {source_path}")
            
            if source_type == 'csv':
                df = pd.read_csv(source_path, dtype=str)
            elif source_type == 'parquet':
                df = pd.read_parquet(source_path)
            elif source_type == 'json':
                df = pd.read_json(source_path, lines=True)
            else:
                raise ValueError(f"Onbekend source type: {source_type}")
            
            logger.info(f"Successvol {len(df)} rijen geëxtraheerd")
            return df
            
        except Exception as e:
            logger.error(f"Extract mislukt: {e}")
            raise ETLError(f"Extract fase mislukt: {e}")
    
    def transform(self, df: pd.DataFrame) -> pd.DataFrame:
        """Transformeer data volgens business rules"""
        try:
            transformations = self.config.get('transformations', {})
            
            # Data cleaning
            if transformations.get('drop_duplicates'):
                df = df.drop_duplicates()
                logger.info(f"Duplicates verwijderd, {len(df)} rijen over")
            
            # Type conversions
            for col, dtype in transformations.get('type_conversions', {}).items():
                if col in df.columns:
                    if dtype == 'datetime':
                        df[col] = pd.to_datetime(df[col], errors='coerce')
                    elif dtype == 'numeric':
                        df[col] = pd.to_numeric(df[col], errors='coerce')
            
            logger.info(f"Transform voltooid, {len(df)} rijen getransformeerd")
            return df
            
        except Exception as e:
            logger.error(f"Transform mislukt: {e}")
            raise ETLError(f"Transform fase mislukt: {e}")
    
    def load(self, df: pd.DataFrame):
        """Load data naar doel"""
        try:
            target_type = self.config['target']['type']
            target_path = self.config['target']['path']
            
            logger.info(f"Load data naar {target_type}: {target_path}")
            
            # Maak directory aan als nodig
            Path(target_path).parent.mkdir(parents=True, exist_ok=True)
            
            if target_type == 'parquet':
                df.to_parquet(target_path, index=False)
            elif target_type == 'csv':
                df.to_csv(target_path, index=False)
            else:
                raise ValueError(f"Onbekend target type: {target_type}")
            
            logger.info(f"Successvol {len(df)} rijen geladen naar {target_path}")
            
        except Exception as e:
            logger.error(f"Load mislukt: {e}")
            raise ETLError(f"Load fase mislukt: {e}")
    
    def run(self):
        """Voer de complete ETL pipeline uit"""
        self.stats['start_time'] = datetime.now()
        logger.info("=" * 50)
        logger.info("ETL Pipeline gestart")
        logger.info("=" * 50)
        
        try:
            # Extract
            raw_data = self.extract()
            
            # Transform
            transformed_data = self.transform(raw_data)
            
            # Load
            self.load(transformed_data)
            
            # Update statistics
            self.stats['rows_processed'] = len(transformed_data)
            self.stats['end_time'] = datetime.now()
            duration = self.stats['end_time'] - self.stats['start_time']
            
            logger.info("=" * 50)
            logger.info("ETL Pipeline succesvol voltooid!")
            logger.info(f"Duur: {duration}")
            logger.info(f"Rijen verwerkt: {self.stats['rows_processed']}")
            logger.info("=" * 50)
            
            return True
            
        except ETLError as e:
            logger.error(f"ETL Pipeline mislukt: {e}")
            self.stats['errors'].append(str(e))
            return False
        except Exception as e:
            logger.error(f"Onverwachte fout: {e}")
            self.stats['errors'].append(str(e))
            return False

if __name__ == "__main__":
    # Voorbeeld gebruik
    pipeline = ETLPipeline("config/etl_config.yaml")
    success = pipeline.run()
    
    if success:
        print("ETL pipeline succesvol uitgevoerd!")
    else:
        print("ETL pipeline mislukt, zie logs voor details")

PySpark Code Voorbeelden

PySpark scripts voor big data processing en distributed computing

Performance Optimization

PySpark Databricks Optimization

Use case: Geavanceerde performance tuning voor PySpark jobs. Optimaliseert partitioning, caching en query execution.

  • Dynamic partition pruning
  • Broadcast joins voor kleine tables
  • Delta Lake optimizations
  • Memory management
performance_optimization.py
"""
PySpark Performance Optimization Template
"""

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window

# Initialize Spark session met optimizations
spark = SparkSession.builder \
    .appName("PerformanceOptimization") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.sql.autoBroadcastJoinThreshold", "104857600") \
    .getOrCreate()

class SparkOptimizer:
    """Spark performance optimizer"""
    
    def __init__(self, spark_session):
        self.spark = spark_session
    
    def repartition_optimally(self, df, partition_column=None, num_partitions=None):
        """
        Herpartitioneer DataFrame optimal voor verdere processing
        """
        current_partitions = df.rdd.getNumPartitions()
        
        if partition_column:
            # Partitioneren op column voor shuffle avoidance
            df = df.repartition(num_partitions or 200, partition_column)
        elif num_partitions:
            # Specifiek aantal partitions
            df = df.repartition(num_partitions)
        else:
            # Adaptive partitioning gebaseerd op data size
            data_size_mb = df.count() * 100 / (1024 * 1024)  # Schatting
            optimal_partitions = max(1, int(data_size_mb / 128))
            df = df.repartition(optimal_partitions)
        
        print(f"Repartitioned from {current_partitions} to {df.rdd.getNumPartitions()} partitions")
        return df

# Voorbeeld: E-commerce Analytics Pipeline
def ecommerce_analytics_pipeline():
    """End-to-end analytics pipeline met performance optimizations"""
    
    optimizer = SparkOptimizer(spark)
    
    # 1. Lees data
    print("📥 Stap 1: Data laden...")
    orders_df = spark.read.format("delta").load("/mnt/data/orders")
    customers_df = spark.read.format("delta").load("/mnt/data/customers")
    
    # 2. Filter en transform
    print("🔄 Stap 2: Data transformeren...")
    recent_orders = orders_df.filter(col("order_date") >= "2024-01-01")
    
    # Repartition op join key voor betere performance
    recent_orders = optimizer.repartition_optimally(recent_orders, "customer_id")
    customers_df = optimizer.repartition_optimally(customers_df, "customer_id")
    
    # 3. Join en aggregatie
    print("🤝 Stap 3: Joins en aggregaties...")
    enriched_orders = recent_orders.join(customers_df, "customer_id", "inner")
    
    customer_metrics = enriched_orders.groupBy("customer_id", "country").agg(
        count("*").alias("total_orders"),
        sum("order_amount").alias("total_spent"),
        avg("order_amount").alias("avg_order_value")
    )
    
    print(f"✅ Pipeline succesvol voltooid! Resultaten: {customer_metrics.count()} rijen")
    
    return customer_metrics

if __name__ == "__main__":
    print("🚀 PySpark Performance Optimization Demo")
    print("=" * 50)
    
    # Voer pipeline uit
    try:
        results = ecommerce_analytics_pipeline()
        
        # Show sample results
        print("\n📋 Sample results:")
        results.show(10, truncate=False)
        
    except Exception as e:
        print(f"❌ Fout: {e}")
    
    finally:
        spark.stop()
        print("\n🎯 Spark session gestopt")